package com.atguigu.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.bean.TrafficPageViewBean;
import com.atguigu.utils.DateFormatUtil;
import com.atguigu.utils.KafkaUtil;
import com.atguigu.utils.MyClickHouseUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

//数据流：web/app -> 日志服务器(log文件) -> flume -> Kafka(ODS) -> FlinkApp -> Kafka(DWD) -> FlinkApp -> ClickHouse(DWS)
//程  序：Mock -> 文件 -> f1.sh -> Kafka(ZK) -> BaseLogApp -> Kafka(ZK) -> Dws02TrafficVcChArIsNewPageViewWindow -> ClickHouse(ZK)
public class Dws02TrafficVcChArIsNewPageViewWindow {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK  ,并指定CK的一致性语义
        //env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置超时时间为 1 分钟
        //env.getCheckpointConfig().setCheckpointTimeout(60 * 1000L);
        // 2.3 设置两次重启的最小时间间隔
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        // 2.5 指定从 CK 自动重启策略
        //env.setRestartStrategy(RestartStrategies.failureRateRestart(
        //        3, Time.days(1L), Time.minutes(1L)
        //));
        // 2.6 设置状态后端
        //env.setStateBackend(new EmbeddedRocksDBStateBackend(true) );
        //env.getCheckpointConfig().setCheckpointStorage(
        //      "hdfs://hadoop102:8020/flinkCDC"
        //);
        // 2.7 设置访问HDFS的用户名
        //System.setProperty("HADOOP_USER_NAME", "atguigu");

        //TODO 2.读取页面日志主题数据创建流
        String topic = "dwd_traffic_page_log";
        String groupId = "vccharisnew_page_view_220718";
        DataStreamSource<String> kafkaDS = env.addSource(KafkaUtil.getFlinkKafkaConsumer(topic, groupId));

        //TODO 3.转换数据为JSON对象
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        //TODO 4.将数据按照mid进行分组
        KeyedStream<JSONObject, String> keyedByMidDS = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"));

        //TODO 5.计算各个度量值,转换数据结构为JavaBean
        SingleOutputStreamOperator<TrafficPageViewBean> trafficPageViewDS = keyedByMidDS.map(new RichMapFunction<JSONObject, TrafficPageViewBean>() {

            private ValueState<String> lastVisitState;

            @Override
            public void open(Configuration parameters) throws Exception {
                ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("last-visit", String.class);
                StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.days(1))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();
                stateDescriptor.enableTimeToLive(ttlConfig);
                lastVisitState = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public TrafficPageViewBean map(JSONObject value) throws Exception {

                //获取状态数据
                String lastDt = lastVisitState.value();
                //获取当前数据中的ts并将其转换为日期
                Long ts = value.getLong("ts");
                String curDt = DateFormatUtil.toDate(ts);
                //获取数据中的last_page_id
                JSONObject page = value.getJSONObject("page");
                String lastPageId = page.getString("last_page_id");

                //定义uv变量
                long uvCt = 0L;
                if (lastDt == null || lastDt.compareTo(curDt) < 0) {
                    uvCt = 1L;
                    lastVisitState.update(curDt);
                }

                //定义会话数
                long svCt = 0L;
                if (lastPageId == null) {
                    svCt = 1L;
                }

                JSONObject common = value.getJSONObject("common");
                return new TrafficPageViewBean("", "",
                        common.getString("vc"),
                        common.getString("ch"),
                        common.getString("ar"),
                        common.getString("is_new"),
                        uvCt,
                        svCt,
                        1L,
                        page.getLong("during_time"),
                        ts);
            }
        });

        //TODO 6.提取事件时间生成WaterMark
        SingleOutputStreamOperator<TrafficPageViewBean> trafficPageViewWithWMDS = trafficPageViewDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TrafficPageViewBean>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TrafficPageViewBean>() {
            @Override
            public long extractTimestamp(TrafficPageViewBean element, long recordTimestamp) {
                return element.getTs();
            }
        }));

        //TODO 7.分组开窗聚合
        WindowedStream<TrafficPageViewBean, Tuple4<String, String, String, String>, TimeWindow> windowDS = trafficPageViewWithWMDS.keyBy(new KeySelector<TrafficPageViewBean, Tuple4<String, String, String, String>>() {
            @Override
            public Tuple4<String, String, String, String> getKey(TrafficPageViewBean value) throws Exception {
                return new Tuple4<>(value.getAr(),
                        value.getCh(),
                        value.getIsNew(),
                        value.getVc());
            }
        })
                .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));

        //增量聚合:效率高,空间占用小
        windowDS.reduce(new ReduceFunction<TrafficPageViewBean>() {
            @Override
            public TrafficPageViewBean reduce(TrafficPageViewBean value1, TrafficPageViewBean value2) throws Exception {
                return null;
            }
        });
        //全量聚合:计算百分比类指标,获取窗口信息
        windowDS.apply(new WindowFunction<TrafficPageViewBean, Object, Tuple4<String, String, String, String>, TimeWindow>() {
            @Override
            public void apply(Tuple4<String, String, String, String> key, TimeWindow window, Iterable<TrafficPageViewBean> input, Collector<Object> out) throws Exception {
                window.getStart();
                window.getEnd();
            }
        });

        SingleOutputStreamOperator<TrafficPageViewBean> resultDS = windowDS.reduce(new ReduceFunction<TrafficPageViewBean>() {
            @Override
            public TrafficPageViewBean reduce(TrafficPageViewBean value1, TrafficPageViewBean value2) throws Exception {
                value1.setUvCt(value1.getUvCt() + value2.getUvCt());
                value1.setPvCt(value1.getPvCt() + value2.getPvCt());
                value1.setSvCt(value1.getSvCt() + value2.getSvCt());
                value1.setDurSum(value1.getDurSum() + value2.getDurSum());
                return value1;
            }
        }, new WindowFunction<TrafficPageViewBean, TrafficPageViewBean, Tuple4<String, String, String, String>, TimeWindow>() {
            @Override
            public void apply(Tuple4<String, String, String, String> key, TimeWindow window, Iterable<TrafficPageViewBean> input, Collector<TrafficPageViewBean> out) throws Exception {

                //获取数据
                TrafficPageViewBean next = input.iterator().next();

                //补充信息
                next.setTs(System.currentTimeMillis());
                next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                next.setStt(DateFormatUtil.toYmdHms(window.getStart()));

                //输出数据
                out.collect(next);
            }
        });

        //TODO 8.将数据写出到ClickHouse
        resultDS.print(">>>>>>>>>");
        resultDS.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_traffic_vc_ch_ar_is_new_page_view_window values(?,?,?,?,?,?,?,?,?,?,?)"));

        //TODO 9.启动任务
        env.execute("Dws02TrafficVcChArIsNewPageViewWindow");

    }

}
